package com.gjy.pulsar.disruptor;

import com.lmax.disruptor.RingBuffer;
import org.apache.pulsar.client.api.Message;

/**
 * @author gjy
 * @version 1.0
 * @since 2025-08-06 11:04:07
 */
public class PulsarToDisruptorBridge {

    private final RingBuffer<PulsarMessageEvent> ringBuffer;

    public PulsarToDisruptorBridge(RingBuffer<PulsarMessageEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    // Pulsar消息回调入口
    public void forwardToDisruptor(Message<byte[]> message) {
        long sequence = ringBuffer.next();
        try {
            PulsarMessageEvent event = ringBuffer.get(sequence);
            event.setPulsarMessage(message); // 注入消息
        } finally {
            ringBuffer.publish(sequence); // 发布到RingBuffer
        }
    }
}
